package com.hanxiaozhang.tx.listener;

import com.alibaba.fastjson.JSONObject;
import com.hanxiaozhang.DistributedLock;
import com.hanxiaozhang.DistributedLockFactory;
import com.hanxiaozhang.config.RedisUtil;
import com.hanxiaozhang.constant.RocketConstant;
import com.hanxiaozhang.tx.entity.OrderEntity;
import com.hanxiaozhang.tx.service.RepoService;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.io.UnsupportedEncodingException;

/**
 * 〈一句话功能简述〉<br>
 * 〈订单支付状态监听〉
 *
 * @author hanxinghua
 * @create 2022/10/9
 * @since 1.0.0
 */
@Slf4j
@Component
@RocketMQMessageListener(
        topic = RocketConstant.TX_ORDER_REPO_TOPIC,
        consumerGroup = RocketConstant.TX_ORDER_REPO_CONSUMER_GROUP)
public class OrderPayStatusListener implements RocketMQListener<MessageExt>, RocketMQPushConsumerLifecycleListener {

    @Autowired
    private RepoService repoService;
    @Autowired
    private DistributedLockFactory distributedLockFactory;
    @Autowired
    private RedisUtil redisUtil;

    @Override
    public void onMessage(MessageExt messageExt) {

        // 高并发下，为了防止库存被扣成负数，使用分布式锁
        DistributedLock lock = null;
        try {
            String keys = messageExt.getKeys();
            lock = distributedLockFactory.getRedisLock(keys);
            lock.lock();
            String orderId = messageExt.getProperties().get("orderId");
            // 消费幂等性处理
            boolean hasConsume = redisUtil.hasConsume(keys, "1", 10 * 60);
            if (hasConsume) {
                log.info("订单id:[{}],重复消费", orderId);
            } else {
                OrderEntity orderEntity = JSONObject.parseObject(new String(messageExt.getBody(), "UTF-8"), OrderEntity.class);
                // 扣减库存
                int reduce = repoService.reduce(orderEntity.getBuyNum(), orderEntity.getGoodId());
                if (reduce > 0) {
                    log.info("订单id:[{}],扣减成功!", orderId);
                }
            }
        } catch (UnsupportedEncodingException e) {
            log.error("订单状态监听异常-不支持此编码转换,异常消息:{}", e);
        } catch (Exception e) {
            log.error("订单状态监听异常,异常消息:{}", e);
        } finally {
            if (lock != null) {
                lock.unlock();
            }
        }
    }

    @Override
    public void prepareStart(DefaultMQPushConsumer consumer) {

    }


}